package com.ctw.tinyservices.id.core.snowflake.holder;

import com.ctw.tinyservices.id.common.utils.LogUtils;
import com.ctw.tinyservices.id.core.snowflake.exception.CheckLastTimeException;
import com.google.common.collect.Maps;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryUntilElapsed;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;

/**
 * @author TongWei.Chen 2022/4/8 14:30
 *
 * Zookeeper
 **/
public class ZookeeperIdSnowflakeHolder extends AbstractIdSnowflakeHolder {
    // 保存自身的key  ip:port-000000001
    private String zkAddressNode;
    // 保存所有数据持久的节点
    private static final String PATH_FOREVER = "/tinyservices_id/snowflake/forever";
    private String zkConnectionAddr;
    private CuratorFramework curator;

    private volatile static ZookeeperIdSnowflakeHolder singleton;
    public static ZookeeperIdSnowflakeHolder getInstance(Integer port, String zkConnectionAddr) {
        if (singleton == null) {
            synchronized (ZookeeperIdSnowflakeHolder.class) {
                if (singleton == null) {
                    singleton = new ZookeeperIdSnowflakeHolder(port, zkConnectionAddr);
                }
            }
        }
        return singleton;
    }

    private ZookeeperIdSnowflakeHolder(int port, String zkConnectionAddr) {
        super(port);
        this.zkConnectionAddr = zkConnectionAddr;
    }

    @Override
    protected void doProcess() throws Exception {
        curator = createWithOptions(zkConnectionAddr, new RetryUntilElapsed(1000, 4), 5000, 6000);
        curator.start();
        Stat stat = curator.checkExists().forPath(PATH_FOREVER);
        if (stat == null) {
            // 不存在根节点,机器第一次启动,创建/tinyservices_id/snowflake/forever/ip:port-000000000,并上传数据
            zkAddressNode = createNode();
        } else {
            // ip:port->00001
            Map<String, Integer> nodeMap = Maps.newHashMap();
            // ip:port->(ip port-000001)
            Map<String, String> realNode = Maps.newHashMap();
            // 存在根节点,先检查是否有属于自己的根节点
            List<String> keys = curator.getChildren().forPath(PATH_FOREVER);
            for (String key : keys) {
                String[] nodeKey = key.split("-");
                realNode.put(nodeKey[0], key);
                nodeMap.put(nodeKey[0], Integer.parseInt(nodeKey[1]));
            }
            Integer workerId = nodeMap.get(listenAddress);
            if (workerId != null) {
                // 有自己的节点,zkAddressNode=ip:port
                zkAddressNode = PATH_FOREVER + "/" + realNode.get(listenAddress);
                this.workerId = workerId;
                if (! checkInitTimeStamp(curator, zkAddressNode)) {
                    throw new CheckLastTimeException("init timestamp check error,forever node timestamp gt this node time");
                }
                LogUtils.info(ZookeeperIdSnowflakeHolder.class, "[Old NODE]find forever node have this endpoint ip-{} port-{} workid-{} childnode and start SUCCESS", ip, port, workerId);
            } else {
                // 表示新启动的节点,创建持久节点,不用check时间
                String newNode = createNode();
                zkAddressNode = newNode;
                String[] nodeKey = newNode.split("-");
                this.workerId = Integer.parseInt(nodeKey[1]);
                LogUtils.info(ZookeeperIdSnowflakeHolder.class, "[New NODE]can not find node on forever node that endpoint ip-{} port-{} workid-{},create own node on forever node and start SUCCESS ", ip, port, workerId);
            }
        }
    }

    private boolean checkInitTimeStamp(CuratorFramework curator, String zk_AddressNode) throws Exception {
        byte[] bytes = curator.getData().forPath(zk_AddressNode);
        Endpoint endPoint = deBuildData(new String(bytes));
        // 该节点的时间不能小于最后一次上报的时间
        return !(endPoint.getTimestamp() > System.currentTimeMillis());
    }

    private CuratorFramework createWithOptions(String zkConnectionAddr, RetryPolicy retryPolicy, int connectionTimeoutMs, int sessionTimeoutMs) {
        return CuratorFrameworkFactory.builder().connectString(zkConnectionAddr)
                .retryPolicy(retryPolicy)
                .connectionTimeoutMs(connectionTimeoutMs)
                .sessionTimeoutMs(sessionTimeoutMs)
                .build();
    }

    /**
     * 创建持久顺序节点, 并把节点数据放入 value
     *
     * @return
     * @throws Exception
     */
    private String createNode() throws Exception {
        try {
            return curator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(PATH_FOREVER + "/" + listenAddress + "-", buildData().getBytes(StandardCharsets.UTF_8));
        } catch (Exception e) {
            LogUtils.error(ZookeeperIdSnowflakeHolder.class, "create node error msg {} ", e.getMessage());
            throw e;
        }
    }

    @Override
    protected void doUpdateMaxTimestamp() throws Exception {
        curator.setData().forPath(zkAddressNode, buildData().getBytes(StandardCharsets.UTF_8));
    }
}
